package com.fanxuankai.boot.canal.client;

import com.fanxuankai.boot.canal.client.autoconfigure.CanalClientProperties;
import org.springframework.kafka.core.KafkaTemplate;

import javax.annotation.Resource;

/**
 * @author fanxuankai
 */
public class KafkaEntryConsumer implements EntryConsumer<MessageInfo> {
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    @Resource
    private CanalClientProperties properties;

    @Override
    public void accept(MessageInfo messageInfo) {
        messageInfo.getMessages().forEach(s -> kafkaTemplate.send(messageInfo.getTopic(), s));
    }

    @Override
    public MessageInfo apply(EntryWrapper entryWrapper) {
        return new MessageInfo(entryWrapper, properties);
    }
}
